Connecting to Kafka Server via Different programming Languages

In order to access the data inside an Apache Kafka, you first need to connect to the Kafka server. We will show you the sample codes to connect your Apache Kafka via Java and Python.

Connecting via Java

First, install Java dependency packages by using the code below

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.7.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.6</version>
    </dependency>

Demo code for Producer connecting to Kafka

package kafka_connection;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

public class KafkaProducerTest {

    public static void main(String[] args) {
        String host = "kafka-4775-0.tripanels.com";
        int port = 4775;
        String boostrap_servers = String.format("%s:%d", host, port);
        // you can create topic in control panel
        String topic = "mytopic";
        String sasl_username = "ben";
        String sasl_password = "ben.w.dbm";
        String truststore_location = "F:\\kafka.truststore.jks";
        String truststore_password = "CJ2xmoFy";
        String keystore_location = "F:\\kafka.keystore.jks";
        String keystore_password = "CJ2xmoFy";

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrap_servers);

        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

        // configure the following settings for SSL Connection
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_location);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststore_password);
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystore_location);
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystore_password);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        String value = "this is a message";

        try {
            for (long index = 30; index < 40; index++) {
                final ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
                        value + ": " + index);

                RecordMetadata metadata = producer.send(record).get();
                System.out.println("Produce OK: " + metadata.toString());
            }

        } catch (Exception e) {
            System.out.println("error occurred");
            e.printStackTrace();
        } finally {
            producer.flush();
            producer.close();
        }

    }

}

Demo code for Consumer connecting to Kafka

package kafka_connection;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

public class KafkaConsumerTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        String host = "kafka-4775-0.tripanels.com";
        int port = 4775;
        String boostrap_servers = String.format("%s:%d", host, port);
        // you can create topic in control panel
        String topic = "mytopic";
        String sasl_username = "ben";
        String sasl_password = "ben.w.dbm";
        String truststore_location = "F:\\kafka.truststore.jks";
        String truststore_password = "CJ2xmoFy";
        String keystore_location = "F:\\kafka.keystore.jks";
        String keystore_password = "CJ2xmoFy";

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrap_servers);

        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

        // configure the following settings for SSL Connection
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststore_location);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststore_password);
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystore_location);
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystore_password);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        // Kafka java client must have a group for consumer.
        // You can specify a group name at will, and kafka server will create
        // the consumer group for you if it doesn't exist
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
                            record.value());
                }
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }
        consumer.close();

    }

}

Connecting via Python

Demo code for Consumer connecting to Kafka

From Kafka import KafkaConsumer

class KafkaConnect(object):
    host = "127.0.0.1"  # your host
    sasl_port = 9092  # your port
    username = "producer name"
    password = "producer pwd"

    def product_connect(self):
        client = KafkaProducer(
            bootstrap_servers='{}:{}'.format(self.host, self.sasl_port),
            sasl_mechanism="SCRAM-SHA-256",  # connection method
            api_version=(2, 7),
            sasl_plain_password=self.password,
            sasl_plain_username=self.username,
            security_protocol="SASL_SSL",
            ssl_cafile="cacert.pem",
            ssl_certfile="cert.pem",
            ssl_keyfile="key.pem",
        )
        return client


if __name__ == '__main__':
    client = KafkaConnect().product_connect()
    client.send("your producer topic", b"message")
    client.flush()

Demo code for Producer connecting to Kafka

From Kafka import KafkaProducer

class KafkaConnect(object):

    host = "127.0.0.1"  # your host
    sasl_port = 9092  # your port
    username = "consume name"
    password = "consume password"
    topic = 'consume topic'

    def consume_connect(self):
        client = KafkaConsumer(
            self.topic,
            api_version=(2, 7),
            auto_offset_reset="earliest",
            bootstrap_servers='{}:{}'.format(self.host, self.sasl_port),
            sasl_mechanism="SCRAM-SHA-256",
            sasl_plain_username=self.username,
            sasl_plain_password=self.password,
            security_protocol="SASL_SSL",
            ssl_cafile="cacert.pem",
            ssl_certfile="cert.pem",
            ssl_keyfile="key.pem",
        )
        return client


if __name__ == '__main__':
    client = KafkaConnect().consume_connect()
    for i in client:
        print(i)
Copyright © 2021 Cloud Clusters Inc. all right reserved,powered by GitbookRevised on 03/24/2021

results matching ""

    No results matching ""